package com.xiaofan.hotitems_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{FieldExpression, _}
import org.apache.flink.types.Row


/**
 * 实时热门商品统计
 */
object HotItemsWithSql {
  def main(args: Array[String]): Unit = {

    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputPath: String = "D:\\big-data\\code\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[UserBehavior] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)

    // UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
    val dataTable: Table = tabEnv.fromDataStream(dataStream, $"itemId", $"behavior", $"timestamp".rowtime() as "ts")

    // 纯sql实现

    tabEnv.createTemporaryView("dataTable", dataTable)
    val resultTable: Table = tabEnv.sqlQuery(
      """
        |select *
        |from (
        |  select
        |     *,
        |     row_number()
        |       over (partition by windowEnd order by cnt desc)
        |       as row_num
        |     from (
        |       select
        |         itemId,
        |         hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd,
        |         count(itemId) as cnt
        |       from dataTable
        |       where behavior = 'pv'
        |       group by
        |       itemId,
        |       hop(ts, interval '5' minute, interval '1' hour)
        |     )
        |  )
        |where row_num <= 5
        |""".stripMargin)


    tabEnv.toRetractStream[Row](resultTable).print()

    env.execute("sql hot items test")

  }
}


























